26.2 异步子Agent任务

26 分钟阅读

异步子Agent任务概述#

异步子Agent任务是Claude Code中处理长时间运行任务的重要机制。通过异步执行,子Agent可以在后台执行任务,主Agent可以继续处理其他请求,从而提高整体效率和响应速度。

异步任务的基本概念#

1. 同步 vs 异步#

示例:同步 vs 异步

同步执行:

python
````python def process_task(task): """同步处理任务""" result1 = agent1.execute(task) result2 = agent2.execute(result1) result3 = agent3.execute(result2) return result3 # 执行时间: 10 + 15 + 20 = 45秒 ```> 异步执行: ``` python async def process_task_async(task): """异步处理任务""" result1 = await agent1.execute_async(task) result2 = await agent2.execute_async(result1) result3 = await agent3.execute_async(result2) return result3 # 执行时间: 20秒(并行执行) ### 2. 异步任务的优势 | 特性 | 同步任务 | 异步任务 | |------|----------|----------| | 执行方式 | 顺序执行 | 并行执行 | | 响应时间 ||| | 资源利用率 ||| | 用户体验 ||| | 复杂度 ||| ## 异步子Agent实现 ### 1. 基础异步子Agent ````python ````python # 示例:基础异步子Agent 用户请求: "创建一个异步代码生成子Agent" Claude Code 生成的代码: ```python ```python import asyncio from typing import Dict, Any import logging logger = logging.getLogger(__name__) class AsyncCodeGenerationAgent: """异步代码生成子Agent""" def __init__(self, context): self.context = context self.task_queue = asyncio.Queue() self.results = {} self.running = False async def start(self): """启动Agent""" self.running = True logger.info("AsyncCodeGenerationAgent started") # 启动任务处理器 asyncio.create_task(self._process_tasks()) async def stop(self): """停止Agent""" self.running = False logger.info("AsyncCodeGenerationAgent stopped") async def submit_task(self, task_id: str, task: Dict[str, Any]): """提交任务""" await self.task_queue.put((task_id, task)) logger.info(f"Task {task_id} submitted") async def get_result(self, task_id: str) -> Dict[str, Any]: """获取任务结果""" while task_id not in self.results: await asyncio.sleep(0.1) return self.results[task_id] async def _process_tasks(self): """处理任务""" while self.running: try: task_id, task = await asyncio.wait_for( self.task_queue.get(), timeout=1.0 ) logger.info(f"Processing task {task_id}") # 处理任务 result = await self._generate_code(task) # 保存结果 self.results[task_id] = result logger.info(f"Task {task_id} completed") except asyncio.TimeoutError: continue except Exception as e: logger.error(f"Error processing task: {e}") async def _generate_code(self, task: Dict[str, Any]) -> Dict[str, Any]: """生成代码""" # 模拟长时间运行的任务 await asyncio.sleep(2) code = f""" # Generated code for {task['name']} def {task['function_name']}(): pass """ return { 'code': code, 'status': 'completed', 'task_id': task.get('task_id') } # 使用示例 async def main(): """主函数""" agent = AsyncCodeGenerationAgent({}) # 启动Agent await agent.start() # 提交任务 task1 = {'name': 'Task 1', 'function_name': 'func1'} task2 = {'name': 'Task 2', 'function_name': 'func2'} task3 = {'name': 'Task 3', 'function_name': 'func3'} await agent.submit_task('task1', task1) await agent.submit_task('task2', task2) await agent.submit_task('task3', task3) # 获取结果 result1 = await agent.get_result('task1') result2 = await agent.get_result('task2') result3 = await agent.get_result('task3') print("Task 1 result:", result1['status']) print("Task 2 result:", result2['status']) print("Task 3 result:", result3['status']) # 停止Agent await agent.stop() if __name__ == '__main__': asyncio.run(main()) ```> **异步子Agent特点**: > - 使用asyncio实现异步处理 > - 任务队列管理 > - 结果存储和检索 > - 后台任务处理 ### 2. 高级异步子Agent ````python `python # 示例:高级异步子Agent 用户请求: "创建一个支持任务优先级和超时的异步子Agent" Claude Code 生成的代码: ```python ```python import asyncio from typing import Dict, Any, Optional from datetime import datetime, timedelta import heapq import logging logger = logging.getLogger(__name__) class AsyncPriorityAgent: """支持优先级的异步子Agent""" def __init__(self, context, max_workers=4): self.context = context self.task_queue = [] self.task_lock = asyncio.Lock() self.results = {} self.running = False self.max_workers = max_workers self.workers = [] self.task_stats = { 'submitted': 0, 'completed': 0, 'failed': 0, 'timeout': 0 } async def start(self): """启动Agent""" self.running = True logger.info(f"AsyncPriorityAgent started with {self.max_workers} workers") # 启动工作线程 for i in range(self.max_workers): worker = asyncio.create_task(self._worker(f"worker-{i}")) self.workers.append(worker) async def stop(self): """停止Agent""" self.running = False # 等待所有工作线程完成 await asyncio.gather(*self.workers, return_exceptions=True) logger.info("AsyncPriorityAgent stopped") async def submit_task( self, task_id: str, task: Dict[str, Any], priority: int = 0, timeout: Optional[float] = None ): """提交任务""" task_data = { 'task_id': task_id, 'task': task, 'priority': priority, 'timeout': timeout, 'submitted_at': datetime.utcnow(), 'status': 'pending' } async with self.task_lock: heapq.heappush(self.task_queue, (-priority, task_data)) self.task_stats['submitted'] += 1 logger.info(f"Task {task_id} submitted with priority {priority}") async def get_result(self, task_id: str, timeout: float = 30.0) -> Dict[str, Any]: """获取任务结果""" start_time = datetime.utcnow() while True: if task_id in self.results: return self.results[task_id] # 检查超时 elapsed = (datetime.utcnow() - start_time).total_seconds() if elapsed > timeout: raise TimeoutError(f"Task {task_id} timeout after {timeout}s") await asyncio.sleep(0.1) async def _worker(self, worker_name: str): """工作线程""" logger.info(f"{worker_name} started") while self.running: try: # 获取任务 task_data = await self._get_task() if task_data is None: await asyncio.sleep(0.1) continue task_id = task_data['task_id'] task = task_data['task'] timeout = task_data.get('timeout') logger.info(f"{worker_name} processing task {task_id}") # 执行任务 try: if timeout: result = await asyncio.wait_for( self._execute_task(task), timeout=timeout ) else: result = await self._execute_task(task) self.results[task_id] = { 'result': result, 'status': 'completed', 'worker': worker_name, 'completed_at': datetime.utcnow() } self.task_stats['completed'] += 1 logger.info(f"{worker_name} completed task {task_id}") except asyncio.TimeoutError: self.results[task_id] = { 'error': 'Task timeout', 'status': 'timeout', 'worker': worker_name } self.task_stats['timeout'] += 1 logger.warning(f"{worker_name} task {task_id} timeout") except Exception as e: self.results[task_id] = { 'error': str(e), 'status': 'failed', 'worker': worker_name } self.task_stats['failed'] += 1 logger.error(f"{worker_name} task {task_id} failed: {e}") except Exception as e: logger.error(f"{worker_name} error: {e}") await asyncio.sleep(1) logger.info(f"{worker_name} stopped") async def _get_task(self) -> Optional[Dict[str, Any]]: """获取任务""" async with self.task_lock: if self.task_queue: _, task_data = heapq.heappop(self.task_queue) task_data['status'] = 'processing' return task_data return None async def _execute_task(self, task: Dict[str, Any]) -> Any: """执行任务""" task_type = task.get('type', 'default') if task_type == 'code_generation': return await self._generate_code(task) elif task_type == 'code_review': return await self._review_code(task) elif task_type == 'test_generation': return await self._generate_tests(task) else: return await self._default_task(task) async def _generate_code(self, task: Dict[str, Any]) -> str: """生成代码""" await asyncio.sleep(2) return f""" # Generated code for {task['name']} def {task['function_name']}(): pass """ async def _review_code(self, task: Dict[str, Any]) -> Dict[str, Any]: """审查代码""" await asyncio.sleep(1.5) return { 'issues': [], 'suggestions': [], 'metrics': {} } async def _generate_tests(self, task: Dict[str, Any]) -> str: """生成测试""" await asyncio.sleep(1) return """ import unittest class TestGeneratedCode(unittest.TestCase): pass """ async def _default_task(self, task: Dict[str, Any]) -> Any: """默认任务""" await asyncio.sleep(1) return {'result': 'completed'} def get_stats(self) -> Dict[str, Any]: """获取统计信息""" return { 'stats': self.task_stats.copy(), 'queue_size': len(self.task_queue), 'active_workers': len(self.workers) } # 使用示例 async def main(): """主函数""" agent = AsyncPriorityAgent({}, max_workers=4) # 启动Agent await agent.start() # 提交不同优先级的任务 await agent.submit_task('task1', {'name': 'Low Priority', 'type': 'code_generation'}, priority=1) await agent.submit_task('task2', {'name': 'High Priority', 'type': 'code_review'}, priority=10) await agent.submit_task('task3', {'name': 'Medium Priority', 'type': 'test_generation'}, priority=5) await agent.submit_task('task4', {'name': 'Timeout Task', 'type': 'code_generation'}, priority=8, timeout=1.0) # 获取结果 try: result1 = await agent.get_result('task1') print("Task 1 result:", result1['status']) except TimeoutError as e: print(f"Task 1 error: {e}") try: result2 = await agent.get_result('task2') print("Task 2 result:", result2['status']) except TimeoutError as e: print(f"Task 2 error: {e}") try: result3 = await agent.get_result('task3') print("Task 3 result:", result3['status']) except TimeoutError as e: print(f"Task 3 error: {e}") try: result4 = await agent.get_result('task4')
bash
    print("Task 4 result:", result4['status'])
except TimeoutError as e:
    print(f"Task 4 error: {e}")

# 获取统计信息
stats = agent.get_stats()
print("\nStatistics:")
print(f"  Submitted: {stats['stats']['submitted']}")
print(f"  Completed: {stats['stats']['completed']}")
print(f"  Failed: {stats['stats']['failed']}")
print(f"  Timeout: {stats['stats']['timeout']}")
print(f"  Queue size: {stats['queue_size']}")

# 停止Agent
await agent.stop()

if name == 'main': asyncio.run(main())

bash
> - 支持任务优先级
> - 支持任务超时
> - 多工作线程
> - 任务统计
> - 错误处理
## 异步任务模式
### 1. 任务编排模式
````python
`python

# 示例:任务编排

用户请求:
"实现一个任务编排器,协调多个异步子Agent"

Claude Code 生成的代码:

```python
```python

import asyncio
from typing import Dict, Any, List
import logging

logger = logging.getLogger(__name__)

class TaskOrchestrator:
    """任务编排器"""

    def __init__(self, context):
        self.context = context
        self.agents = {}
        self.task_graph = {}
        self.results = {}

    def register_agent(self, name: str, agent):
        """注册Agent"""
        self.agents[name] = agent
        logger.info(f"Agent {name} registered")

    def define_workflow(self, workflow: Dict[str, Any]):
        """定义工作流"""
        self.task_graph = workflow
        logger.info("Workflow defined")

    async def execute_workflow(self) -> Dict[str, Any]:
        """执行工作流"""
        logger.info("Starting workflow execution")

        # 执行任务
        for task in self.task_graph['tasks']:
            await self._execute_task(task)

        logger.info("Workflow execution completed")

        return self.results

    async def _execute_task(self, task: Dict[str, Any]):
        """执行任务"""
        task_id = task['id']
        task_type = task['type']
        agent_name = task['agent']
        dependencies = task.get('dependencies', [])

        logger.info(f"Executing task {task_id}")

        # 等待依赖任务完成
        for dep_id in dependencies:
            await self._wait_for_task(dep_id)

        # 获取Agent
        agent = self.agents.get(agent_name)
        if not agent:
            raise ValueError(f"Agent {agent_name} not found")

        # 执行任务
        if task_type == 'code_generation':
            result = await agent._generate_code(task['params'])
        elif task_type == 'code_review':
            result = await agent._review_code(task['params'])
        elif task_type == 'test_generation':
            result = await agent._generate_tests(task['params'])
        else:
            result = await agent._default_task(task['params'])

        # 保存结果
        self.results[task_id] = result

        logger.info(f"Task {task_id} completed")

    async def _wait_for_task(self, task_id: str):
        """等待任务完成"""
        while task_id not in self.results:
            await asyncio.sleep(0.1)

# 使用示例
async def main():
    """主函数"""
    orchestrator = TaskOrchestrator({})

    # 注册Agent
    code_agent = AsyncCodeGenerationAgent({})
    review_agent = AsyncCodeReviewAgent({})
    test_agent = AsyncTestGenerationAgent({})

    await code_agent.start()
    await review_agent.start()
    await test_agent.start()

    orchestrator.register_agent('code', code_agent)
    orchestrator.register_agent('review', review_agent)
    orchestrator.register_agent('test', test_agent)

    # 定义工作流
    workflow = {
        'tasks': [
            {
                'id': 'task1',
                'type': 'code_generation',
                'agent': 'code',
                'params': {'name': 'User Service', 'function_name': 'create_user'}
            },
            {
                'id': 'task2',
                'type': 'code_generation',
                'agent': 'code',
                'params': {'name': 'Product Service', 'function_name': 'create_product'}
            },
            {
                'id': 'task3',
                'type': 'code_review',
                'agent': 'review',
                'params': {'code': 'result from task1'},
                'dependencies': ['task1']
            },
            {
                'id': 'task4',
                'type': 'code_review',
                'agent': 'review',
                'params': {'code': 'result from task2'},
                'dependencies': ['task2']
            },
            {
                'id': 'task5',
                'type': 'test_generation',
                'agent': 'test',
                'params': {'code': 'result from task1'},
                'dependencies': ['task1', 'task3']
            },
            {
                'id': 'task6',
                'type': 'test_generation',
                'agent': 'test',
                'params': {'code': 'result from task2'},
                'dependencies': ['task2', 'task4']
            }
        ]
    }

    orchestrator.define_workflow(workflow)

    # 执行工作流
    results = await orchestrator.execute_workflow()

    print("Workflow results:")
    for task_id, result in results.items():
        print(f"  {task_id}: {result.get('status', 'unknown')}")

if __name__ == '__main__':
    asyncio.run(main())

```> **任务编排特点**:
> - 定义任务依赖关系
> - 自动处理任务执行顺序
> - 支持并行执行
> - 结果收集和传递
### 2. 任务分发模式
````python
`python

# 示例:任务分发

用户请求:
"实现一个任务分发器,将任务分发给多个异步子Agent"

Claude Code 生成的代码:

```python
```python

import asyncio
from typing import Dict, Any, List
import random
import logging

logger = logging.getLogger(__name__)

class TaskDispatcher:
    """任务分发器"""

    def __init__(self, context):
        self.context = context
        self.agents = []
        self.task_queue = asyncio.Queue()
        self.running = False

    def register_agent(self, agent):
        """注册Agent"""
        self.agents.append(agent)
        logger.info(f"Agent registered, total: {len(self.agents)}")

    async def start(self):
        """启动分发器"""
        self.running = True
        logger.info("TaskDispatcher started")

        # 启动分发线程
        asyncio.create_task(self._dispatch_tasks())

    async def stop(self):
        """停止分发器"""
        self.running = False
        logger.info("TaskDispatcher stopped")

    async def submit_task(self, task: Dict[str, Any]):
        """提交任务"""
        await self.task_queue.put(task)
        logger.info(f"Task submitted: {task.get('id', 'unknown')}")

    async def _dispatch_tasks(self):
        """分发任务"""
        while self.running:
            try:
                # 获取任务
                task = await asyncio.wait_for(
                    self.task_queue.get(),
                    timeout=1.0
                )

                # 选择Agent
                agent = self._select_agent(task)

                if agent:
                    # 提交任务给Agent
                    await agent.submit_task(task['id'], task)
                    logger.info(f"Task {task['id']} dispatched to {agent.__class__.__name__}")
                else:
                    logger.warning(f"No available agent for task {task['id']}")

            except asyncio.TimeoutError:
                continue
            except Exception as e:
                logger.error(f"Error dispatching task: {e}")

    def _select_agent(self, task: Dict[str, Any]) -> Any:
        """选择Agent"""
        if not self.agents:
            return None

        # 根据任务类型选择Agent
        task_type = task.get('type', 'default')

        for agent in self.agents:
            if hasattr(agent, 'can_handle') and agent.can_handle(task_type):
                return agent

        # 随机选择一个Agent
        return random.choice(self.agents)

class LoadBalancedDispatcher(TaskDispatcher):
    """负载均衡分发器"""

    def __init__(self, context):
        super().__init__(context)
        self.agent_loads = {}

    def register_agent(self, agent):
        """注册Agent"""
        super().register_agent(agent)
        self.agent_loads[agent] = 0

    async def _dispatch_tasks(self):
        """分发任务(负载均衡)"""
        while self.running:
            try:
                # 获取任务
                task = await asyncio.wait_for(
                    self.task_queue.get(),
                    timeout=1.0
                )

                # 选择负载最低的Agent
                agent = self._select_least_loaded_agent(task)

                if agent:
                    # 提交任务给Agent
                    await agent.submit_task(task['id'], task)
                    self.agent_loads[agent] += 1
                    logger.info(f"Task {task['id']} dispatched to {agent.__class__.__name__} (load: {self.agent_loads[agent]})")
                else:
                    logger.warning(f"No available agent for task {task['id']}")

            except asyncio.TimeoutError:
                continue
            except Exception as e:
                logger.error(f"Error dispatching task: {e}")

    def _select_least_loaded_agent(self, task: Dict[str, Any]) -> Any:
        """选择负载最低的Agent"""
        if not self.agents:
            return None

        # 找到负载最低的Agent
        min_load = float('inf')
        selected_agent = None

        for agent in self.agents:
            if self.agent_loads[agent] < min_load:
                min_load = self.agent_loads[agent]
                selected_agent = agent

        return selected_agent

    def update_agent_load(self, agent, delta: int):
        """更新Agent负载"""
        if agent in self.agent_loads:
            self.agent_loads[agent] += delta
            logger.info(f"Agent {agent.__class__.__name__} load updated to {self.agent_loads[agent]}")

# 使用示例
async def main():
    """主函数"""
    dispatcher = LoadBalancedDispatcher({})

    # 注册Agent
    agent1 = AsyncCodeGenerationAgent({})
    agent2 = AsyncCodeGenerationAgent({})
    agent3 = AsyncCodeGenerationAgent({})

    await agent1.start()
    await agent2.start()
    await agent3.start()

    dispatcher.register_agent(agent1)
    dispatcher.register_agent(agent2)
    dispatcher.register_agent(agent3)

    # 启动分发器
    await dispatcher.start()

    # 提交多个任务
    for i in range(10):
        await dispatcher.submit_task({
            'id': f'task{i}',
            'type': 'code_generation',
            'name': f'Task {i}',
            'function_name': f'func{i}'
        })

    # 等待任务完成
    await asyncio.sleep(5)

    # 停止分发器
    await dispatcher.stop()

    # 停止Agent
    await agent1.stop()
    await agent2.stop()
    await agent3.stop()

if __name__ == '__main__':
    asyncio.run(main())

```> **任务分发特点**:
> - 负载均衡
> - 动态Agent选择
> - 负载监控
> - 任务路由
## 异步任务监控
### 1. 任务状态监控
````python
`python

# 示例:任务状态监控

用户请求:
"实现一个任务状态监控系统"

Claude Code 生成的代码:

```python
```python

import asyncio
from typing import Dict, Any
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

class TaskMonitor:
    """任务监控器"""

    def __init__(self):
        self.tasks = {}
        self.events = []

    def register_task(self, task_id: str, task_info: Dict[str, Any]):
        """注册任务"""
        self.tasks[task_id] = {
            'id': task_id,
            'status': 'pending',
            'created_at': datetime.utcnow(),
            'updated_at': datetime.utcnow(),
            **task_info
        }

        self._log_event(task_id, 'registered')
        logger.info(f"Task {task_id} registered")

    def update_task_status(self, task_id: str, status: str, **kwargs):
        """更新任务状态"""
        if task_id not in self.tasks:
            logger.warning(f"Task {task_id} not found")
            return

        self.tasks[task_id]['status'] = status
        self.tasks[task_id]['updated_at'] = datetime.utcnow()
        self.tasks[task_id].update(kwargs)

        self._log_event(task_id, f'status_changed_to_{status}')
        logger.info(f"Task {task_id} status updated to {status}")

    def get_task_status(self, task_id: str) -> Dict[str, Any]:
        """获取任务状态"""
        return self.tasks.get(task_id, {})

    def get_all_tasks(self) -> Dict[str, Any]:
        """获取所有任务"""
        return self.tasks

    def get_tasks_by_status(self, status: str) -> Dict[str, Any]:
        """根据状态获取任务"""
        return {
            task_id: task
            for task_id, task in self.tasks.items()
            if task['status'] == status
        }

    def get_task_statistics(self) -> Dict[str, Any]:
        """获取任务统计"""
        stats = {
            'total': len(self.tasks),
            'pending': 0,
            'processing': 0,
            'completed': 0,
            'failed': 0,
            'timeout': 0
        }

        for task in self.tasks.values():
            status = task['status']
            if status in stats:
                stats[status] += 1

        return stats

    def _log_event(self, task_id: str, event: str):
        """记录事件"""
        self.events.append({
            'task_id': task_id,
            'event': event,
            'timestamp': datetime.utcnow()
        })

    def get_task_events(self, task_id: str) -> list:
        """获取任务事件"""
        return [
            event for event in self.events
            if event['task_id'] == task_id
        ]

# 使用示例
async def main():
    """主函数"""
    monitor = TaskMonitor()

    # 注册任务
    monitor.register_task('task1', {'name': 'Task 1', 'type': 'code_generation'})
    monitor.register_task('task2', {'name': 'Task 2', 'type': 'code_review'})
    monitor.register_task('task3', {'name': 'Task 3', 'type': 'test_generation'})

    # 更新任务状态
    monitor.update_task_status('task1', 'processing')
    await asyncio.sleep(1)
    monitor.update_task_status('task1', 'completed', result='success')

    monitor.update_task_status('task2', 'processing')
    await asyncio.sleep(1)
    monitor.update_task_status('task2', 'failed', error='validation error')

    monitor.update_task_status('task3', 'processing')
    await asyncio.sleep(1)
    monitor.update_task_status('task3', 'timeout')

    # 获取任务状态
    print("Task 1 status:", monitor.get_task_status('task1'))
    print("Task 2 status:", monitor.get_task_status('task2'))
    print("Task 3 status:", monitor.get_task_status('task3'))

    # 获取任务统计
    stats = monitor.get_task_statistics()
    print("\nTask Statistics:")
    for key, value in stats.items():
        print(f"  {key}: {value}")

    # 获取任务事件
    print("\nTask 1 events:")
    for event in monitor.get_task_events('task1'):
        print(f"  {event['timestamp']}: {event['event']}")

if __name__ == '__main__':
    asyncio.run(main())

任务监控特点:

  • 实时状态跟踪
  • 任务统计
  • 事件记录
  • 状态查询

标记本节教程为已读

记录您的学习进度,方便后续查看。